package main

import (
	"context"
	"log"
	"math/rand"
	"os"
	"os/signal"
	"strconv"
	"sync"
	"syscall"
	"time"
)

type Message struct {
	id int
}

/*
   生产者
*/
type Producer struct {
	taskChannels []chan Message
	ctx          context.Context
	// 用于等待所有的生产者全部接受到停止信号停止处理
	wg *sync.WaitGroup
	// 所有生产者接受到停止信号的时候，只有一个生产者去关闭所有taskChannels
	once *sync.Once
}

func NewProducer(taskChannels []chan Message, ctx context.Context) *Producer {
	p := &Producer{taskChannels: taskChannels,
		ctx:  ctx,
		wg:   &sync.WaitGroup{},
		once: &sync.Once{}}
	return p
}

func (p *Producer) Produce(name string) {
	random := rand.New(rand.NewSource(1024))
	for {
		select {
		// 判断是否要退出了
		case <-p.ctx.Done():
			log.Printf("%s produce break start", name)
			// 通知通知监听者组任务的协程我受到取消信号了
			p.wg.Done()
			// 等待所有协程都处理完成
			p.wg.Wait()
			log.Printf("%s produce break done", name)
			p.once.Do(func() {
				// produce在监听到种终止事件之后关闭channel，所有的produce只执行一次
				for _, ch := range p.taskChannels {
					// 关闭所有持有的channel
					close(ch)
				}
			})
			// 这个之后的代码是否能执行到就不一定了，因为channel关闭就可能导致主协程wait终止掉

			return
		default:
			// 模拟生产者获取数据
			num := random.Intn(len(p.taskChannels))
			// 间隔一秒产出一个数据
			time.Sleep(1 * time.Second)

			p.taskChannels[num] <- Message{id: num}
		}
	}
}

func (p *Producer) GoProduce(name string, goNum int) {
	for i := 0; i < goNum; i++ {
		p.wg.Add(1)
		go func(goName string) {
			p.Produce(goName)
		}(name + "-" + strconv.Itoa(i))
	}
}

/*
   消费者
*/
type Consumer struct {
	taskChannel []chan Message
	wg          *sync.WaitGroup
}

func NewConsumer(taskChannel []chan Message, wg *sync.WaitGroup) *Consumer {
	c := &Consumer{taskChannel: taskChannel,
		wg: wg}
	return c
}

func (c *Consumer) Consume(name string) {
	channelClose := make(map[int]bool, len(c.taskChannel))
	random := rand.New(rand.NewSource(1024))
	for {
		index := random.Intn(len(c.taskChannel))
		select {
		case task, ok := <-c.taskChannel[index]:
			// 模拟消费者处理数据
			if ok {
				log.Printf("%s consumer  deal: %d", name, task.id)
			} else {
				// 每个生产者都去确认自己所消费的channel都已经关闭
				channelClose[index] = true
				if len(channelClose) == len(c.taskChannel) {
					// 检查所持有的所有channel已经关闭
					// 数据消费完成，且channel关闭的时候，通知主线程消费者处理管道中的数据完成
					log.Printf("%s consumer done ", name)
					goto flag
				}
			}
		default:
			// 当存在多个channel的时候，避免等待一个一直阻塞，尝试获取其他channel
		}
	}
	// 跳出多层循环
flag:
	// 消费者标识所有消息已经处理完成
	defer c.wg.Done()
}

func (c *Consumer) GoConsume(name string, goNum int) {
	for i := 0; i < goNum; i++ {
		c.wg.Add(1)
		go func(goName string) {
			c.Consume(goName)
		}(name + "-" + strconv.Itoa(i))
	}
}

/*
	可以将生产和消费模型简单分类：每类可以使用相同的方法创建
	1.所有生产者和channel和消费者之间全连接
	2.生产者不和所有channel全连接，消费者和所有channel全连接
	3.消费者不和所有channel全连接，生产者和所有channel全连接
*/
func main() {
	log.Print("main start")
	parent := context.Background()
	wc, cancelFunc := context.WithCancel(parent)

	go func() {
		signals := make(chan os.Signal)
		signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
		// 这个协程 就阻塞这里等待结果
		<-signals
		// 监听进程kill信号，通知produce停止产出
		log.Printf("start deal kill signal")
		cancelFunc()
		log.Printf("end deal kill signal")
	}()

	// 用于判断所有消费者处理完成
	wg := &sync.WaitGroup{}

	// 生产者的数量
	producerGoNum := 1

	// 管道的数量
	channelNum := 1

	// 消费者的的数量
	consumerGoNum := 1

	// new生产者的个数
	newProducerNum := 3

	// 创建生产者
	producers := make([]*Producer, newProducerNum)
	// all task channel
	var allTaskChannel []chan Message
	for i := 0; i < newProducerNum; i++ {
		// channel创建
		taskChannels := make([]chan Message, channelNum)
		for i := range taskChannels {
			taskChannels[i] = make(chan Message)
			allTaskChannel = append(allTaskChannel, taskChannels[i])
		}
		// 生产者创建
		producers[i] = NewProducer(taskChannels, wc)
	}

	// 消费者创建
	consumer := NewConsumer(allTaskChannel, wg)
	// 启动消费者
	consumer.GoConsume("c", consumerGoNum)

	// 启动生产者
	for i, producer := range producers {
		// 启动生产者
		producer.GoProduce("p"+strconv.Itoa(i), producerGoNum)
	}

	// 等待consumer完成
	wg.Wait()
	log.Print("main end")
}
